跳到主要内容

Go Channel 怎么使用?

三种状态和三种操作结果

操作空值(nil)非空已关闭非空未关闭
关闭panicpanic成功关闭
发送数据永久阻塞panic阻塞或成功发送
接收数据永久阻塞永不阻塞阻塞或者成功接收

1、空值(nil)关闭

// 
func main() {
var c chan int
close(c)
}
// panic: close of nil channel

2、非空已关闭再次关闭

func main() {
c := make(chan int)
close(c)
close(c)
}
// panic: close of closed channel

3、向已关闭的通道发送数据

func main() {
c := make(chan int)
close(c)
c <- 1
}
// panic: send on closed channel

通道缓冲区

通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:

ch := make(chan int, 100)

带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。

不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。

注意:如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。

package main

import "fmt"

func main() {
// 这里我们定义了一个可以存储整数类型的带缓冲通道
// 缓冲区大小为2
ch := make(chan int, 2)

// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
// 而不用立刻需要去同步读取数据
ch <- 1
ch <- 2

// 获取这两个数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}

只读/只写 channel(单向)

channel 可以通过参数传递,所谓单向 channel 只是对 channel 的一种使用限制,这跟C语言使用 const 修饰函数参数为只读是一个道理。

func readChan(chanName <-chan int) {
<- chanName
}

func writeChan(chanName chan<- int) {
chanName <- 1
}

func main() {
var mychan = make(chan int, 10)

writeChan(mychan)
readChan(mychan)
}

Go 关闭管道的作用

channel 其实有两个返回值

v, ok := <-ch

如果通道接收不到数据后 ok 就为 false。

func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
// close 可以关闭一个 channel
close(c)
}()

for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}

fmt.Println("Finished...")
}
备注

关闭 channel 后可以继续从 channel 接收数据;对于 nil channel 无论收发都会被阻塞(不使用 make 创建)。

// nil channel
var ch chan int

上面的代码可以简化为:

func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
// close 可以关闭一个 channel
close(c)
}()

for data := range c {
fmt.Println(data)
}

fmt.Println("Finished...")
}

当 channel 被关闭时,循环会自动结束,如果没有关闭 channel,就必须使用 break 语句来跳出循环。

通道的发送和接收特性

这个通道其实有点像 Java 的阻塞队列,下面介绍下通道的特性:

1、对于同一个通道,发送操作之间是互斥的,接收操作之间也是互斥的。

简单来说就是在同一时刻,Go的运行系统只会执行对同一个通道的任意个发送操作中的某一个,直到这个元素值被完全 复制 进该通道之后,其他发送操作才会执行。针对接收操作也是这样。

对于通道中的同一个值,发送操作和接收操作也是互斥的。如正在被复制进通道但还未复制完成的元素值,这时接收方也不会看到和取走。

2、发送操作和接收操作中对元素值的处理都是不可分割的。 不可分割意思就是发送操作要么还没复制元素,要么已经复制完毕,不会出现值只复制了一部分的情况。

3、发送操作在完全完成之前会被阻塞。接收操作也是如此。

发送操作包括,“复制元素值”,“放置副本到通道内” 二个步骤。在这二个步骤完成之前,发送操作会一直阻塞,他之后的代码是不会执行的。

接收操作包括 “复制通道内元素值”,“放置副本到接收方”,“删除原值” 三个操作。这三个操作在完成之前也是会一直阻塞的。

什么时候会被阻塞呢

发送操作和接收操作在什么时候会被阻塞呢

对于缓存通道

  • 如果通道已满,所有的发送操作就会阻塞,直到通道中有元素被取走
  • 如果通道已空,所有的接收操作就会阻塞,直到通道中有新的元素

对于非缓存通道

  • 无论发送操作还是接受操作一开始就是阻塞的,只有配对的操作出现才会开始执行。所以当接收方或者发送方一方没了都会造成阻塞

收发操作何时会引起 panic

通道关闭,在进行发送操作会引发 panic;关闭一个已经关闭的通道也会引发 panic

package main
import "fmt"
func main() {
ch1 := make(chan int, 2)
// 发送方。
go func() {
for i := 0; i < 10; i++ {
fmt.Printf("Sender: sending element %v...\n", i)
ch1 <- i
}
fmt.Println("Sender: close the channel...")
close(ch1)
}()

// 接收方。
for {
elem, ok := <- ch1
if !ok {
fmt.Println("Receiver: closed channel")
break
}
fmt.Printf("Receiver: received an element: %v\n", elem)
}

fmt.Println("End.")
}

Channel 引起的死锁场景

1、场景1:一个通道在同一个 go 协程读写

func main() {
c := make(chan int)
c <- 666
<- c // fatal error: all goroutines are asleep - deadlock!
}

2、场景二:go 程开启之前使用通道

func main() {
c := make(chan int)
c <- 666
go func() {
<-c
}()
}

3、场景三:通道1中调用了通道2,通道2中调用通道1

func main() {
c1, c2 := make(chan int), make(chan int)
go func() {
for {
select {
case <-c1:
c2 <- 10
}
}
}()

for {
select {
case <-c2:
c1 <- 10
}
}
}

select 语句

select 就是用来监听和 channel 有关的 IO 操作,当 IO 操作发生时,触发相应的动作。

Go 编程语言中 select 语句的语法如下:

select {
case <-ch1:
// 如果从 ch1 信道成功接收数据,则执行该分支代码
case ch2 <- 1:
// 如果成功向 ch2 信道成功发送数据,则执行该分支代码
default:
// 如果上面都没有成功,则进入 default 分支处理流程
}

select 也可以用于同时监听多个 channel,如下所示:

package main

import (
"fmt"
)

func main() {
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
// time.Sleep(time.Second)
ch1 <- 1
}()

go func() {
ch2 <- 3
}()
// 这里只执行一次
select {
case i := <-ch1:
fmt.Printf("从 ch1 读取了数据 %d", i)
case j := <-ch2:
fmt.Printf("从 ch2 读取了数据 %d", j)
}
}

输出:

从 ch2 读取了数据 3

select 中的 case 语句是随机执行的,并没有顺序,且只执行一次,所以上面打印完 ch2 后就结束了(main 结束了其它的子协程都会结束)

Go 语言 select 阻塞

很多时候我们需要让 main 函数不退出,让它在后台一直执行,例如:

func main() {
for i := 0; i < 20; i++ { //启动20个协程处理消息队列中的消息
c := consumer.New()
go c.Start()
}
select {} // 阻塞
}

channel 忘记关闭的陷阱

除了超时场景,其他使用协程(goroutine)的场景,也很容易因为实现不当,导致协程无法退出,随着时间的积累,造成内存耗尽,程序崩溃。

例如下面的例子:

func do(taskCh chan int) {
for {
select {
case t := <-taskCh:
time.Sleep(time.Millisecond)
fmt.Printf("task %d is done\n", t)
}
}
}

func sendTasks() {
taskCh := make(chan int, 10)
go do(taskCh)
for i := 0; i < 1000; i++ {
taskCh <- i
}
}

func TestDo(t *testing.T) {
t.Log(runtime.NumGoroutine())
sendTasks()
time.Sleep(time.Second)
t.Log(runtime.NumGoroutine())
}
  • do 的实现非常简单,for + select 的模式,等待信道 taskCh 传递任务,并执行。
  • sendTasks 模拟向信道中发送任务。

该用例执行结果如下:

$ go test . -v
--- PASS: TestDo (2.34s)
exit_test.go:29: 2
exit_test.go:32: 3

单元测试执行结束后,子协程多了一个,也就是说,有一个协程一直没有得到释放。我们仔细看代码,很容易发现 sendTasks 中启动了一个子协程 go do(taskCh),因为这个协程一直处于阻塞状态,等待接收任务,因此直到程序结束,协程也没有释放。

如果任务全部发送成功,我们如何通知该协程结束等待,正常退出呢?

接收操作可以有 2 个返回值。

v, beforeClosed := <-ch

beforeClosed 代表 v 是否是信道关闭前发送的。

  • true 代表是信道关闭前发送的,同时管道里没有数据
  • false 代表信道已经关闭。

如果一个信道已经关闭,<-ch 将永远不会发生阻塞,但是我们可以通过第二个返回值 beforeClosed 得知信道已经关闭,作出相应的处理。

修改成如下:

func doCheckClose(taskCh chan int) {
for {
select {
case t, beforeClosed := <-taskCh:
if !beforeClosed {
fmt.Println("taskCh has been closed")
return
}

time.Sleep(time.Millisecond)
fmt.Printf("task %d is done\n", t)
}
}
}

func sendTasksCheckClose() {
taskCh := make(chan int, 10)
go doCheckClose(taskCh)
for i := 0; i < 1000; i++ {
taskCh <- i
}
close(taskCh) // ⭐
}

func TestDoCheckClose(t *testing.T) {
t.Log(runtime.NumGoroutine())
sendTasksCheckClose()
time.Sleep(time.Second)
runtime.GC()
t.Log(runtime.NumGoroutine())
}

sendTasks 函数中,任务发送结束之后,使用 close(taskCh) 将 channel taskCh 关闭。

$ go test -run=TestDoCheckClose -v
task 999 is done
taskCh has been closed
--- PASS: TestDoCheckClose (2.34s)
exit_test.go:59: 2
exit_test.go:63: 2

可以发现,启动的协程已经正常退出,该协程以及使用到的信道 taskCh 将被垃圾回收,资源得到释放。

关于通道和协程的垃圾回收

注意,一个通道被其 发送数据协程队列接收数据协程队列 中的所有协程引用着。 因此, 如果一个通道的这两个队列只要有一个不为空,则此通道肯定不会被垃圾回收。 另一方面,如果一个协程处于一个通道的某个协程队列之中,则此协程也肯定不会被垃圾回收,即使此通道仅被此协程所引用。事实上,一个协程只有在退出后才能被垃圾回收。

通道关闭原则

一个常用的使用 Go 通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说,我们只应该让 一个通道唯一的发送者关闭此通道。

粗鲁的方式(非常不推荐)

如果 channel 已经被关闭,再次关闭会产生 panic,这时通过 recover 使程序恢复正常。

func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 一个函数的返回结果可以在 defer 调用中修改。
justClosed = false
}
}()

// 假设ch != nil。
close(ch) // 如果 ch 已关闭,将 panic
return true // <=> justClosed = true; return
}

礼貌的方式

使用 sync.Once 或互斥锁 sync.Mutex 确保 channel 只被关闭一次。

type MyChannel struct {
C chan T
once sync.Once
}

func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}

优雅的方式

  • 情形一:M 个接收者和一个发送者,发送者通过关闭用来传输数据的通道来传递发送结束信号。
  • 情形二:一个接收者和 N 个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要再发送数据了。
  • 情形三:M 个接收者和 N 个发送者,它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号。

Reference

Golang通道Channel详解 极客兔兔-如何退出协程 goroutine (其他场景)